using System;
using System.Linq;
using System.Threading.Channels;
using System.Threading.Tasks;
using EventStore.Client.Monitoring;
using EventStore.Core.Bus;
using EventStore.Core.Messages;
using EventStore.Core.Messaging;
using Grpc.Core;

namespace EventStore.Core.Services.Transport.Grpc {
	internal partial class Monitoring : EventStore.Client.Monitoring.Monitoring.MonitoringBase {
		private readonly IPublisher _publisher;
		
		public override Task Stats(StatsReq request, IServerStreamWriter<StatsResp> responseStream, ServerCallContext context) {
			var channel = Channel.CreateBounded<StatsResp>(new BoundedChannelOptions(1) {
				SingleReader = true,
				SingleWriter = true
			});

			_ = Receive();

			return channel.Reader.ReadAllAsync(context.CancellationToken)
				.ForEachAwaitAsync(responseStream.WriteAsync, context.CancellationToken);
			
			async Task Receive() {
				var delay = TimeSpan.FromMilliseconds(request.RefreshTimePeriodInMs);
				var envelope = new CallbackEnvelope(message => {
					if (message is not MonitoringMessage.GetFreshStatsCompleted completed) {
						channel.Writer.TryComplete(UnknownMessage<MonitoringMessage.GetFreshStatsCompleted>(message));
						return;
					}

					var response = new StatsResp();

					foreach (var (key, value) in completed.Stats.Where(stat => stat.Value is not null)) {
						response.Stats.Add(key, value.ToString());
					}

					channel.Writer.TryWrite(response);

				});
				while (!context.CancellationToken.IsCancellationRequested) {
					_publisher.Publish(
						new MonitoringMessage.GetFreshStats(envelope, x => x, request.UseMetadata, false));

					await Task.Delay(delay, context.CancellationToken).ConfigureAwait(false);
				}
			}
		}

		public Monitoring(IPublisher publisher) {
			_publisher = publisher;
		}

		private static Exception UnknownMessage<T>(Message message) where T : Message =>
			new RpcException(
				new Status(StatusCode.Unknown,
					$"Envelope callback expected {typeof(T).Name}, received {message.GetType().Name} instead"));
	}
}
